pkg/beholder: add batch emitter service with service-engine lifecycle#2059
pkg/beholder: add batch emitter service with service-engine lifecycle#2059pkcll wants to merge 7 commits into
Conversation
|
730b714 to
fa7e617
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds lifecycle-managed Chip Ingress batch emission to Beholder and wires it into LOOP configuration/startup, while refactoring emitter ownership so the Beholder client manages service start/stop.
Changes:
- Adds
ChipIngressBatchEmitterServicewith batching, lifecycle integration, metrics, tests, and benchmarks. - Refactors
beholder.Client, noop/writer/http clients, and emitters to useservices.Enginelifecycle and injected loggers. - Adds LOOP env/config propagation for the batch emitter feature flag and starts/stops the Beholder client from the server.
Reviewed changes
Copilot reviewed 21 out of 22 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
pkg/services/service.go |
Removes obsolete lint suppression in service startup tracing. |
pkg/loop/server.go |
Starts/stops the lifecycle-managed Beholder client and passes batch-emitter config/logger. |
pkg/loop/config.go |
Adds LOOP batch-emitter env flag parsing and command env propagation. |
pkg/loop/config_test.go |
Updates config parsing/export tests for the new flag. |
pkg/loop/plugin_relayer_emitter_test.go |
Updates noop Beholder client construction with test logger. |
pkg/beholder/config.go |
Adds batch-emitter configuration defaults and clarifies emitter batching docs. |
pkg/beholder/config_test.go |
Updates config example output for new fields. |
pkg/beholder/client.go |
Embeds service lifecycle, creates optional batch sub-service, and refactors close behavior. |
pkg/beholder/client_test.go |
Adds lifecycle coverage for client close and batch sub-service behavior. |
pkg/beholder/batch_emitter_service.go |
Adds the new Chip Ingress batch emitter service implementation. |
pkg/beholder/batch_emitter_service_test.go |
Adds tests and benchmark coverage for batch emitting, callbacks, defaults, and metrics. |
pkg/beholder/chip_ingress_emitter.go |
Moves legacy fire-and-forget behavior into the Chip Ingress emitter. |
pkg/beholder/chip_ingress_emitter_test.go |
Updates legacy emitter tests for async publish and injected logger. |
pkg/beholder/dual_source_emitter.go |
Simplifies dual-source emission to delegate Chip Ingress async behavior. |
pkg/beholder/dual_source_emitter_test.go |
Updates constructor usage and blocking behavior coverage. |
pkg/beholder/noop.go |
Initializes noop/writer clients with service lifecycle and optional logger. |
pkg/beholder/noop_test.go |
Updates noop client tests for lifecycle startup. |
pkg/beholder/httpclient.go |
Initializes HTTP client with service lifecycle and avoids duplicate provider shutdown. |
pkg/beholder/global_test.go |
Updates global Beholder tests for lifecycle-aware clients. |
pkg/beholder/beholdertest/beholder.go |
Uses lifecycle-aware noop client in test observer setup. |
go.mod |
Bumps pkg/chipingress dependency for batch client support. |
go.sum |
Updates checksums for the bumped pkg/chipingress dependency. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // teardown after parent close hook completes. | ||
| chipIngressEmitter = noCloseEmitter{Emitter: batchEmitterService} | ||
| } else { | ||
| chipIngressEmitter, err = NewChipIngressEmitter(chipIngressClient, cfg.ChipIngressLogger) |
| s.beholderClient = beholderClient | ||
| beholder.SetClient(beholderClient) | ||
| beholder.SetGlobalOtelProviders() | ||
|
|
||
| if beholderCfg.LogStreamingEnabled { | ||
| otelLogger, err := NewOtelLogger(beholderClient.Logger, beholderCfg.LogLevel) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to enable log streaming: %w", err) | ||
| } | ||
| s.Logger = logger.Sugared(logger.Named(otelLogger, s.Logger.Name())) | ||
| } | ||
|
|
| } | ||
|
|
||
| func (e *ChipIngressBatchEmitterService) start(ctx context.Context) error { | ||
| e.batchClient.Start(ctx) |
There was a problem hiding this comment.
Oh is this true? Does the batch client hold on to the context? That won't work.
e8165ba to
d3e2a0e
Compare
b4641ea to
0f95004
Compare
Use caller ctx for OTel metric Add calls (non-blocking, tolerates cancelled contexts) and b.Context() in benchmarks.
…nto infoplat-3436-chipingress-batching-part-2
…tartup ctx The services contract forbids retaining the startup context after Start returns. Use eng.NewCtx() to get a lifecycle-owned context that is cancelled when StopChan closes during service shutdown, rather than passing through the caller's startup context.
Ticket: https://smartcontract-it.atlassian.net/browse/INFOPLAT-3436
Summary
ChipIngressBatchEmitterServicebacked by chipingress batch client, managed as a sub-service of the beholderClientviaservices.EngineDualSourceEmitter.Close()so it delegates fire-and-forget toChipIngressEmitterwithout directly closing the service-managed batch emitterbeholder.Clientdirectly; removed test-only indirection inpkg/loop/server.goand obsoletepkg/loop/server_test.goChipIngressBatchEmitterEnabled), tests, and benchmarkspkg/loopserver and config to propagate batch emitter settingsData Flow
flowchart LR A["Caller or app code"] -->|"Emit(ctx, body, attrs)"| B["beholder.Client.Emitter"] B --> C["DualSourceEmitter"] C -->|"synchronous"| D["OTLP message emitter"] C -->|"batch enabled"| E["ChipIngressBatchEmitterService"] E -->|"queue event"| F["batch.Client"] F -->|"accumulate by size or interval"| G["PublishBatch request"] G --> H["chipingress.Client"] H --> I["Chip Ingress gRPC endpoint"] F --> J["batch client metrics"] E --> K["emitter success or drop metrics"] B --> L["beholder logger, tracer, and meter providers"]Dependency Diagram
flowchart TD S["loop.Server"] -->|"start and close"| BC["beholder.Client"] S --> CFG["loop.EnvConfig"] CFG --> FLAG["CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED"] BC --> ENG1["services.Engine"] BC --> DSE["DualSourceEmitter"] BC --> CHIP["chipingress.Client"] BC --> OTLP["OTLP providers"] ENG1 --> BES["ChipIngressBatchEmitterService"] BES --> ENG2["services.Engine"] BES --> BATCH["batch.Client"] BATCH --> CHIP ENG1 --> CINS["CloseIfNeverStarted"] ENG2 --> CINS CINS --> SVC["services.Config"]Metrics
Emitter-level metrics added in this PR:
chip_ingress.events_sentchip_ingress.events_droppedBatch client metrics (
chip_ingress.batch.*) are introduced in the dependency #2058.Dependencies
Depends on #2058 (
pkg/chipingressbatch client metrics). After #2058 merges, chipingress dependency will be bumped frommain.Split from #1862 (part 2 of 2 — beholder + loop changes).
Supports